package flinkstudy.stream.source.mock;

import flinkstudy.Constant;
import flinkstudy.bo.EstateInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 固定有限数据源
 *
 * @author daocr
 * @date 2019/12/14
 */
public class FixedLimitedData {

    public static SingleOutputStreamOperator<EstateInfo> of(StreamExecutionEnvironment env) {

        DataStreamSource<String> dataStreamSource = env.readTextFile(Constant.FilePath.ESTATE_CSV_PATH, "utf-8");

        SingleOutputStreamOperator<EstateInfo> map = dataStreamSource.map(e -> {

            EstateInfo estateInfo = new EstateInfo();
            String[] split = e.split(",");

            estateInfo.setCityId(Integer.valueOf(split[1]));
            estateInfo.setDistrictId(Integer.valueOf(split[2]));
            estateInfo.setHouseId(Integer.valueOf(split[3]));
            estateInfo.setPlateId(Integer.valueOf(split[4]));
            estateInfo.setPrice(Long.valueOf(split[0]));
            estateInfo.setCreateTimeFormat(split[5]);
            return estateInfo;

        });


        return map;
    }

}
